Coverage for cpprb/PyReplayBuffer.pyx: 96%

Shortcuts on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

937 statements  

1# distutils: language = c++ 

2# cython: linetrace=True 

3  

4import ctypes 

5from logging import getLogger, StreamHandler, Formatter, INFO 

6import multiprocessing as mp 

7import time 

8from typing import Any, Dict, Callable, Optional 

9import warnings 

10  

11cimport numpy as np 

12import numpy as np 

13import cython 

14from cython.operator cimport dereference 

15  

16from cpprb.ReplayBuffer cimport * 

17from cpprb.multiprocessing import RawArray, RawValue, _has_SharedMemory, try_start 

18  

19from .VectorWrapper cimport * 

20from .VectorWrapper import (VectorWrapper, 

21 VectorInt,VectorSize_t, 

22 VectorDouble,PointerDouble,VectorFloat) 

23  

24  

25def default_logger(level=INFO): 

26 """ 

27 Create default logger for cpprb 

28 """ 

29 logger = getLogger("cpprb") 

30 logger.setLevel(level) 

31  

32 if not logger.hasHandlers(): 

33 handler = StreamHandler() 

34 handler.setLevel(level) 

35  

36 format = Formatter("%(asctime)s.%(msecs)03d [%(levelname)s] " + 

37 "(%(filename)s:%(lineno)s) %(message)s", 

38 "%Y%m%d-%H%M%S") 

39 handler.setFormatter(format) 

40 logger.addHandler(handler) 

41 logger.propagate = False 

42  

43 return logger 

44  

45cdef double [::1] Cdouble(array): 

46 return np.ravel(np.array(array,copy=False,dtype=np.double,ndmin=1,order='C')) 

47  

48cdef inline const size_t [::1] Csize(array): 

49 return np.ravel(np.array(array,copy=False,dtype=np.uint64,ndmin=1,order='C')) 

50  

51@cython.embedsignature(True) 

52cdef inline const float [::1] Cfloat(array): 

53 return np.ravel(np.array(array,copy=False,dtype=np.single,ndmin=1,order='C')) 

54  

55  

56def unwrap(d): 

57 return d[np.newaxis][0] 

58  

59  

60@cython.embedsignature(True) 

61cdef class Environment: 

62 """ 

63 Base class to store environment 

64 """ 

65 cdef PointerDouble obs 

66 cdef PointerDouble act 

67 cdef PointerDouble rew 

68 cdef PointerDouble next_obs 

69 cdef PointerDouble done 

70 cdef size_t buffer_size 

71 cdef size_t obs_dim 

72 cdef size_t act_dim 

73 cdef size_t rew_dim 

74 cdef bool is_discrete_action 

75 cdef obs_shape 

76  

77 def __cinit__(self,size,obs_dim=1,act_dim=1,*, 

78 rew_dim=1,is_discrete_action = False, 

79 obs_shape = None, **kwargs): 

80 self.obs_shape = obs_shape 

81 self.is_discrete_action = is_discrete_action 

82  

83 cdef size_t _dim 

84 if self.obs_shape is None: 

85 self.obs_dim = obs_dim 

86 else: 

87 self.obs_dim = 1 

88 for _dim in self.obs_shape: 

89 self.obs_dim *= _dim 

90  

91 self.act_dim = act_dim if not self.is_discrete_action else 1 

92 self.rew_dim = rew_dim 

93  

94 self.obs = PointerDouble(ndim=2,value_dim=self.obs_dim,size=size) 

95 self.act = PointerDouble(ndim=2,value_dim=self.act_dim,size=size) 

96 self.rew = PointerDouble(ndim=2,value_dim=self.rew_dim,size=size) 

97 self.next_obs = PointerDouble(ndim=2,value_dim=self.obs_dim,size=size) 

98 self.done = PointerDouble(ndim=2,value_dim=1,size=size) 

99  

100 def __init__(self,size,obs_dim=1,act_dim=1,*, 

101 rew_dim=1,is_discrete_action = False, 

102 obs_shape = None, **kwargs): 

103 """ 

104 Parameters 

105 ---------- 

106 size : int 

107 buffer size 

108 obs_dim : int, optional 

109 observation (obs) dimension whose default value is 1 

110 act_dim : int, optional 

111 action (act) dimension whose default value is 1 

112 rew_dim : int, optional 

113 reward (rew) dimension whose default value is 1 

114 is_discrete_action: bool, optional 

115 If True, act_dim is compressed to 1 whose default value is False 

116 obs_shape: array-like 

117 observation shape. If not None, overwrite obs_dim. 

118 """ 

119 pass 

120  

121 cdef size_t _add(self,double [::1] o,double [::1] a,double [::1] r, 

122 double [::1] no,double [::1] d): 

123 raise NotImplementedError 

124  

125 def add(self,obs,act,rew,next_obs,done): 

126 """ 

127 Add environment(s) into replay buffer. 

128 Multiple step environments can be added. 

129  

130 Parameters 

131 ---------- 

132 obs : array_like or float or int 

133 observation(s) 

134 act : array_like or float or int 

135 action(s) 

136 rew : array_like or float or int 

137 reward(s) 

138 next_obs : array_like or float or int 

139 next observation(s) 

140 done : array_like or float or int 

141 done(s) 

142  

143 Returns 

144 ------- 

145 int 

146 the stored first index 

147 """ 

148 return self._add(Cdouble(obs),Cdouble(act),Cdouble(rew),Cdouble(next_obs),Cdouble(done)) 

149  

150 def _encode_sample(self,idx): 

151 dtype = np.int if self.is_discrete_action else np.double 

152  

153 _o = self.obs.as_numpy()[idx] 

154 _no = self.next_obs.as_numpy()[idx] 

155 if self.obs_shape is not None: 

156 _shape = (-1,*self.obs_shape) 

157 _o = _o.reshape(_shape) 

158 _no = _no.reshape(_shape) 

159  

160 return {'obs': _o, 

161 'act': self.act.as_numpy(dtype=dtype)[idx], 

162 'rew': self.rew.as_numpy()[idx], 

163 'next_obs': _no, 

164 'done': self.done.as_numpy()[idx]} 

165  

166 cpdef size_t get_buffer_size(self): 

167 """ 

168 Get buffer size 

169  

170 Parameters 

171 ---------- 

172  

173 Returns 

174 ------- 

175 size_t 

176 buffer size 

177 """ 

178 return self.buffer_size 

179  

180 cdef void _update_size(self,size_t new_size): 

181 """ Update environment size 

182  

183 Parameters 

184 ---------- 

185 new_size : size_t 

186 new size to set as environment (obs,act,rew,next_obs,done) 

187  

188 Returns 

189 ------- 

190 """ 

191 self.obs.update_vec_size(new_size) 

192 self.act.update_vec_size(new_size) 

193 self.rew.update_vec_size(new_size) 

194 self.next_obs.update_vec_size(new_size) 

195 self.done.update_vec_size(new_size) 

196  

197 cpdef size_t get_obs_dim(self): 

198 """Return observation dimension (obs_dim) 

199 """ 

200 return self.obs_dim 

201  

202 def get_obs_shape(self): 

203 """Return observation shape 

204 """ 

205 return self.obs_shape 

206  

207 cpdef size_t get_act_dim(self): 

208 """Return action dimension (act_dim) 

209 """ 

210 return self.act_dim 

211  

212 cpdef size_t get_rew_dim(self): 

213 """Return reward dimension (rew_dim) 

214 """ 

215 return self.rew_dim 

216  

217  

218@cython.embedsignature(True) 

219cdef class SelectiveEnvironment(Environment): 

220 """ 

221 Base class for episode level management envirionment 

222 """ 

223 cdef CppSelectiveEnvironment[double,double,double,double] *buffer 

224 def __cinit__(self,episode_len,obs_dim=1,act_dim=1,*,Nepisodes=10,rew_dim=1,**kwargs): 

225 self.buffer_size = episode_len * Nepisodes 

226  

227 self.buffer = new CppSelectiveEnvironment[double,double, 

228 double,double](episode_len, 

229 Nepisodes, 

230 self.obs_dim, 

231 self.act_dim, 

232 self.rew_dim) 

233  

234 self.buffer.get_buffer_pointers(self.obs.ptr, 

235 self.act.ptr, 

236 self.rew.ptr, 

237 self.next_obs.ptr, 

238 self.done.ptr) 

239  

240 def __init__(self,episode_len,obs_dim=1,act_dim=1,*,Nepisodes=10,rew_dim=1,**kwargs): 

241 """ 

242 Parameters 

243 ---------- 

244 episode_len : int 

245 the mex size of environments in a single episode 

246 obs_dim : int 

247 observation (obs, next_obs) dimension 

248 act_dim : int 

249 action (act) dimension 

250 Nepisodes : int, optional 

251 the max size of stored episodes 

252 rew_dim : int, optional 

253 reward (rew) dimension 

254 """ 

255 pass 

256  

257 cdef size_t _add(self,double [::1] obs,double [::1] act, double [::1] rew, 

258 double [::1] next_obs, double [::1] done): 

259 return self.buffer.store(&obs[0],&act[0],&rew[0], 

260 &next_obs[0],&done[0],done.shape[0]) 

261  

262 cpdef void clear(self) except *: 

263 """ 

264 Clear replay buffer. 

265  

266 Parameters 

267 ---------- 

268  

269 Returns 

270 ------- 

271 """ 

272 clear(self.buffer) 

273  

274 cpdef size_t get_stored_size(self): 

275 """ 

276 Get stored size 

277  

278 Parameters 

279 ---------- 

280  

281 Returns 

282 ------- 

283 size_t 

284 stored size 

285 """ 

286 return get_stored_size(self.buffer) 

287  

288 cpdef size_t get_next_index(self): 

289 """ 

290 Get the next index to store 

291  

292 Parameters 

293 ---------- 

294  

295 Returns 

296 ------- 

297 size_t 

298 the next index to store 

299 """ 

300 return get_next_index(self.buffer) 

301  

302 cpdef size_t get_stored_episode_size(self): 

303 """ 

304 Get the size of stored episodes 

305  

306 Parameters 

307 ---------- 

308  

309 Returns 

310 ------- 

311 size_t 

312 the size of stored episodes 

313 """ 

314 return self.buffer.get_stored_episode_size() 

315  

316 cpdef size_t delete_episode(self,i): 

317 """ 

318 Delete specified episode 

319  

320 The stored environment after specified episode are moved to backward. 

321  

322 Parameters 

323 ---------- 

324 i : int 

325 the index of delete episode 

326  

327 Returns 

328 ------- 

329 size_t 

330 the size of environments in the deleted episodes 

331 """ 

332 return self.buffer.delete_episode(i) 

333  

334 def get_episode(self,i): 

335 """ 

336 Get specified episode 

337  

338 Parameters 

339 ---------- 

340 i : int 

341 the index of extracted episode 

342  

343 Returns 

344 ------- 

345 dict of ndarray 

346 the set environment in i-th episode 

347 """ 

348 cdef size_t len = 0 

349 self.buffer.get_episode(i,len, 

350 self.obs.ptr,self.act.ptr,self.rew.ptr, 

351 self.next_obs.ptr,self.done.ptr) 

352 if len == 0: 

353 return {'obs': np.ndarray((0,self.obs_dim)), 

354 'act': np.ndarray((0,self.act_dim)), 

355 'rew': np.ndarray((0,self.rew_dim)), 

356 'next_obs': np.ndarray((0,self.obs_dim)), 

357 'done': np.ndarray(0)} 

358  

359 self._update_size(len) 

360 return {'obs': self.obs.as_numpy(), 

361 'act': self.act.as_numpy(), 

362 'rew': self.rew.as_numpy(), 

363 'next_obs': self.next_obs.as_numpy(), 

364 'done': self.done.as_numpy()} 

365  

366 def _encode_sample(self,indexes): 

367 self.buffer.get_buffer_pointers(self.obs.ptr, 

368 self.act.ptr, 

369 self.rew.ptr, 

370 self.next_obs.ptr, 

371 self.done.ptr) 

372 cdef size_t buffer_size = self.get_buffer_size() 

373 self._update_size(buffer_size) 

374 return super()._encode_sample(indexes) 

375  

376@cython.embedsignature(True) 

377cdef class SelectiveReplayBuffer(SelectiveEnvironment): 

378 """ 

379 Replay buffer to store episodes of environment. 

380  

381 This class can get and delete a episode. 

382 """ 

383 def __cinit__(self,episode_len,obs_dim=1,act_dim=1,*,Nepisodes=10,rew_dim=1,**kwargs): 

384 pass 

385  

386 def __init__(self,episode_len,obs_dim=1,act_dim=1,*,Nepisodes=10,rew_dim=1,**kwargs): 

387 """ 

388 Parameters 

389 ---------- 

390 episode_len : int 

391 the max size of a single episode 

392 obs_dim : int 

393 observation (obs, next_obs) dimension 

394 act_dim : int 

395 action (act) dimension 

396 Nepisodes : int, optional 

397 the max size of stored episodes whose default value is 10 

398 rew_dim : int, optional 

399 reward (rew) dimension whose dimension is 1 

400 """ 

401 pass 

402  

403 def sample(self,batch_size): 

404 """ 

405 Sample the stored environment randomly with specified size 

406  

407 Parameters 

408 ---------- 

409 batch_size : int 

410 sampled batch size 

411  

412 Returns 

413 ------- 

414 dict of ndarray 

415 Sampled batch transitions, which might contains the same transition 

416 multiple times. 

417 """ 

418 cdef idx = np.random.randint(0,self.get_stored_size(),batch_size) 

419 return self._encode_sample(idx) 

420  

421  

422cdef class SharedBuffer: 

423 cdef dtype 

424 cdef data 

425 cdef data_ndarray 

426 cdef view 

427 cdef backend 

428 def __init__(self, shape, dtype, data=None, ctx=None, backend="sharedctypes"): 

429 self.dtype = np.dtype(dtype) 

430 ctx = ctx or mp.get_context() 

431 self.backend = backend 

432  

433 if data is None: 

434 try: 

435 ctype = np.ctypeslib.as_ctypes_type(self.dtype) 

436 except NotImplementedError: 

437 # Dirty hack to allocate correct size shared memory 

438 for d in (np.int8, np.int16, np.int32, np.int64): 

439 _d = np.dtype(d) 

440  

441 if self.dtype.itemsize == _d.itemsize: 

442 ctype = np.ctypeslib.as_ctypes_type(_d) 

443 break 

444 else: 

445 raise 

446  

447 len = int(np.array(shape,copy=False,dtype="int").prod()) 

448 self.data = RawArray(ctx,ctype,len,self.backend) 

449 else: 

450 self.data = data 

451  

452 self.data_ndarray = self.data.ndarray 

453 self.data_ndarray.shape = shape 

454  

455 # Reinterpretation 

456 if self.dtype != self.data_ndarray.dtype: 

457 self.view = self.data_ndarray.view(self.dtype) 

458 else: 

459 self.view = self.data_ndarray 

460  

461  

462 def __getitem__(self,key): 

463 return self.view[key] 

464  

465 def __setitem__(self,key,value): 

466 self.view[key] = value 

467  

468 def __reduce__(self): 

469 return (SharedBuffer,(self.view.shape,self.dtype,self.data,None,self.backend)) 

470  

471  

472def dict2buffer(buffer_size: int,env_dict: Dict,*, 

473 stack_compress = None, default_dtype = None, 

474 mmap_prefix: Optional[str] = None, 

475 shared: Optional[str] = None, 

476 ctx = None): 

477 """Create buffer from env_dict 

478  

479 Parameters 

480 ---------- 

481 buffer_size : int 

482 buffer size 

483 env_dict : dict of dict 

484 Specify environment values to be stored in buffer. 

485 stack_compress : str or array like of str, optional 

486 compress memory of specified stacked values. 

487 default_dtype : numpy.dtype, optional 

488 fallback dtype for not specified in `env_dict`. default is numpy.single 

489 mmap_prefix : str, optional 

490 File name prefix to save buffer data using mmap. If `None` (default), 

491 save only on memory. 

492  

493 Returns 

494 ------- 

495 buffer : dict of numpy.ndarray 

496 buffer for environment specified by env_dict. 

497 """ 

498 cdef buffer = {} 

499 cdef bool compress_any = (stack_compress is not None) 

500 default_dtype = default_dtype or np.single 

501  

502 def zeros(name,shape,dtype): 

503 if shared: 

504 return SharedBuffer(shape,dtype, ctx=ctx, backend=shared) 

505  

506 if mmap_prefix: 

507 if not isinstance(shape,tuple): 

508 shape = tuple(shape) 

509 return np.memmap(f"{mmap_prefix}_{name}.dat", 

510 shape=shape,dtype=dtype,mode="w+") 

511 else: 

512 return np.zeros(shape=shape,dtype=dtype) 

513  

514 for name, defs in env_dict.items(): 

515 shape = np.insert(np.asarray(defs.get("shape",1)),0,buffer_size) 

516  

517 if compress_any and np.isin(name, 

518 stack_compress, 

519 assume_unique=True).any(): 

520 buffer_shape = np.insert(np.delete(shape,-1),1,shape[-1]) 

521 buffer_shape[0] += buffer_shape[1] - 1 

522 buffer_shape[1] = 1 

523 memory = zeros(name, buffer_shape, 

524 dtype=defs.get("dtype",default_dtype)) 

525 strides = np.append(np.delete(memory.strides,1),memory.strides[1]) 

526 buffer[name] = np.lib.stride_tricks.as_strided(memory, 

527 shape=shape, 

528 strides=strides) 

529 else: 

530 buffer[name] = zeros(name,shape,dtype=defs.get("dtype",default_dtype)) 

531  

532 buffer[name][:] = 1 

533  

534 shape[0] = -1 

535 defs["add_shape"] = shape 

536  

537 return buffer 

538  

539def find_array(dict,key): 

540 """Find 'key' and ensure numpy.ndarray with the minimum dimension of 1. 

541  

542 Parameters 

543 ---------- 

544 dict : dict 

545 dict where find 'key' 

546 key : str 

547 dictionary key to find 

548  

549 Returns 

550 ------- 

551 : numpy.ndarray or None 

552 If `dict` has `key`, returns the values with numpy.ndarray with the minimum 

553 dimension of 1. Otherwise, returns `None`. 

554 """ 

555 return None if not key in dict else np.array(dict[key],ndmin=1,copy=False) 

556  

557@cython.embedsignature(True) 

558cdef class StepChecker: 

559 """Check the step size of addition 

560 """ 

561 cdef check_str 

562 cdef check_shape 

563  

564 def __init__(self,env_dict,special_keys = None): 

565 """Initialize StepChecker class. 

566  

567 Parameters 

568 ---------- 

569 env_dict : dict 

570 Specify the environment values. 

571 """ 

572 special_keys = special_keys or [] 

573 for name, defs in env_dict.items(): 

574 if name in special_keys: 

575 continue 

576 self.check_str = name 

577 self.check_shape = defs["add_shape"] 

578  

579 cdef size_t step_size(self,kwargs) except *: 

580 """Return step size. 

581  

582 Parameters 

583 ---------- 

584 kwargs: dict 

585 Added values. 

586 """ 

587 return np.reshape(np.array(kwargs[self.check_str], copy=False), 

588 self.check_shape,order='A').shape[0] 

589  

590@cython.embedsignature(True) 

591cdef class NstepBuffer: 

592 """Local buffer class for Nstep reward. 

593  

594 This buffer temporary stores environment values and returns Nstep-modified 

595 environment values for `ReplayBuffer` 

596 """ 

597 cdef buffer 

598 cdef size_t buffer_size 

599 cdef default_dtype 

600 cdef size_t stored_size 

601 cdef size_t Nstep_size 

602 cdef float Nstep_gamma 

603 cdef Nstep_rew 

604 cdef Nstep_next 

605 cdef env_dict 

606 cdef stack_compress 

607 cdef StepChecker size_check 

608  

609 def __cinit__(self,env_dict=None,Nstep=None,*, 

610 stack_compress = None,default_dtype = None,next_of = None): 

611 self.env_dict = env_dict.copy() if env_dict else {} 

612 self.stored_size = 0 

613 self.stack_compress = None # stack_compress is not support yet. 

614 self.default_dtype = default_dtype or np.single 

615  

616 if next_of is not None: # next_of is not support yet. 

617 for name in np.array(next_of,copy=False,ndmin=1): 

618 self.env_dict[f"next_{name}"] = self.env_dict[name] 

619  

620 self.Nstep_size = Nstep["size"] 

621 self.Nstep_gamma = Nstep.get("gamma",0.99) 

622 self.Nstep_rew = find_array(Nstep,"rew") 

623 self.Nstep_next = find_array(Nstep,"next") 

624  

625 self.buffer_size = self.Nstep_size - 1 

626 self.buffer = dict2buffer(self.buffer_size,self.env_dict, 

627 stack_compress = self.stack_compress, 

628 default_dtype = self.default_dtype) 

629 self.size_check = StepChecker(self.env_dict) 

630  

631 def __init__(self,env_dict=None,Nstep=None,*, 

632 stack_compress = None,default_dtype = None, next_of = None): 

633 r"""Initialize NstepBuffer class. 

634  

635 Parameters 

636 ---------- 

637 env_dict : dict 

638 Specify environment values to be stored. 

639 Nstep : dict 

640 `Nstep["size"]` is `int` specifying step size of Nstep reward. 

641 `Nstep["rew"]` is `str` or array like of `str` specifying 

642 Nstep reward to be summed. `Nstep["gamma"]` is float specifying 

643 discount factor, its default is 0.99. `Nstep["next"]` is `str` or 

644 list of `str` specifying next values to be moved. 

645 stack_compress : str or array like of str, optional 

646 compress memory of specified stacked values. 

647 default_dtype : numpy.dtype, optional 

648 fallback dtype for not specified in `env_dict`. default is numpy.single 

649 next_of : str or array like of str, optional 

650 next item of specified environemt variables (eg. next_obs for next) are 

651 also sampled without duplicated values 

652  

653 Notes 

654 ----- 

655 Currently, memory compression features (`stack_compress` and `next_of`) are 

656 not supported yet. (Fall back to usual storing) 

657 """ 

658 pass 

659  

660 def add(self,*,**kwargs): 

661 r"""Add envronment into local buffer. 

662  

663 Parameters 

664 ---------- 

665 **kwargs : keyword arguments 

666 Values to be added. 

667  

668 Returns 

669 ------- 

670 env : dict or None 

671 Values with Nstep reward calculated. When the local buffer does not 

672 store enough cache items, returns 'None'. 

673 """ 

674 cdef size_t N = self.size_check.step_size(kwargs) 

675 cdef ssize_t end = self.stored_size + N 

676  

677 cdef ssize_t i 

678 cdef ssize_t stored_begin 

679 cdef ssize_t stored_end 

680 cdef ssize_t ext_begin 

681 cdef ssize_t max_slide 

682  

683 # Case 1 

684 # If Nstep buffer don't become full, store all the input transitions. 

685 # These transitions are partially calculated. 

686 if end <= self.buffer_size: 

687 for name, stored_b in self.buffer.items(): 

688 if self.Nstep_rew is not None and np.isin(name,self.Nstep_rew).any(): 

689 # Calculate later. 

690 pass 

691 elif (self.Nstep_next is not None 

692 and np.isin(name,self.Nstep_next).any()): 

693 # Do nothing. 

694 pass 

695 else: 

696 stored_b[self.stored_size:end] = self._extract(kwargs,name) 

697  

698 # Nstep reward must be calculated after "done" filling 

699 gamma = (1.0 - self.buffer["done"][:end]) * self.Nstep_gamma 

700  

701 if self.Nstep_rew is not None: 

702 max_slide = min(self.Nstep_size - self.stored_size,N) 

703 max_slide *= -1 

704 for name in self.Nstep_rew: 

705 ext_b = self._extract(kwargs,name).copy() 

706 self.buffer[name][self.stored_size:end] = ext_b 

707  

708 for i in range(self.stored_size-1,max_slide,-1): 

709 stored_begin = max(i,0) 

710 stored_end = i+N 

711 ext_begin = max(-i,0) 

712 ext_b[ext_begin:] *= gamma[stored_begin:stored_end] 

713 self.buffer[name][stored_begin:stored_end] +=ext_b[ext_begin:] 

714  

715 self.stored_size = end 

716 return None 

717  

718 # Case 2 

719 # If we have enough transitions, return calculated transtions 

720 cdef size_t diff_N = self.buffer_size - self.stored_size 

721 cdef size_t add_N = N - diff_N 

722 cdef bool NisBigger = (add_N > self.buffer_size) 

723 end = self.buffer_size if NisBigger else add_N 

724  

725 # Nstep reward must be calculated before "done" filling 

726 cdef ssize_t spilled_N 

727 gamma = np.ones((self.stored_size + N,1),dtype=np.single) 

728 gamma[:self.stored_size] -= self.buffer["done"][:self.stored_size] 

729 gamma[self.stored_size:] -= self._extract(kwargs,"done") 

730 gamma *= self.Nstep_gamma 

731 if self.Nstep_rew is not None: 

732 max_slide = min(self.Nstep_size - self.stored_size,N) 

733 max_slide *= -1 

734 for name in self.Nstep_rew: 

735 stored_b = self.buffer[name] 

736 ext_b = self._extract(kwargs,name) 

737  

738 copy_ext = ext_b.copy() 

739 if diff_N: 

740 stored_b[self.stored_size:] = ext_b[:diff_N] 

741 ext_b = ext_b[diff_N:] 

742  

743 for i in range(self.stored_size-1,max_slide,-1): 

744 stored_begin = max(i,0) 

745 stored_end = i+N 

746 ext_begin = max(-i,0) 

747 copy_ext[ext_begin:] *= gamma[stored_begin:stored_end] 

748 if stored_end <= self.buffer_size: 

749 stored_b[stored_begin:stored_end] += copy_ext[ext_begin:] 

750 else: 

751 spilled_N = stored_end - self.buffer_size 

752 stored_b[stored_begin:] += copy_ext[ext_begin:-spilled_N] 

753 ext_b[:spilled_N] += copy_ext[-spilled_N:] 

754  

755 self._roll(stored_b,ext_b,end,NisBigger,kwargs,name,add_N) 

756  

757 for name, stored_b in self.buffer.items(): 

758 if self.Nstep_rew is not None and np.isin(name,self.Nstep_rew).any(): 

759 # Calculated. 

760 pass 

761 elif (self.Nstep_next is not None 

762 and np.isin(name,self.Nstep_next).any()): 

763 kwargs[name] = self._extract(kwargs,name)[diff_N:] 

764 else: 

765 ext_b = self._extract(kwargs,name) 

766  

767 if diff_N: 

768 stored_b[self.stored_size:] = ext_b[:diff_N] 

769 ext_b = ext_b[diff_N:] 

770  

771 self._roll(stored_b,ext_b,end,NisBigger,kwargs,name,add_N) 

772  

773 done = kwargs["done"] 

774  

775 for i in range(1,self.buffer_size): 

776 if i <= add_N: 

777 done[:-i] += kwargs["done"][i:] 

778 done[-i:] += self.buffer["done"][:i] 

779 else: 

780 done += self.buffer["done"][i-add_N:i] 

781  

782 self.stored_size = self.buffer_size 

783 return kwargs 

784  

785 cdef _extract(self,kwargs,name): 

786 _dict = self.env_dict[name] 

787 return np.reshape(np.array(kwargs[name],copy=False,ndmin=2, 

788 dtype=_dict.get("dtype",self.default_dtype)), 

789 _dict["add_shape"]) 

790  

791 cdef void _roll(self,stored_b,ext_b, 

792 ssize_t end,bool NisBigger,kwargs,name,size_t add_N): 

793 # Swap numpy.ndarray 

794 copy_ext = ext_b.copy() # ext_b might be unwriteable, so that copy it. 

795 copy_ext[-end:] = stored_b[:end] 

796 stored_b[:end] = ext_b[-end:] 

797  

798 if NisBigger: 

799 # buffer: XXXX, add: YYYYY 

800 # buffer: YYYY, add: YXXXX 

801 copy_ext = np.roll(copy_ext,end,axis=0) 

802 # buffer: YYYY, add: XXXXY 

803 else: 

804 # buffer: XXXZZZZ, add: YYY 

805 # buffer: YYYZZZZ, add: XXX 

806 stored_b[:] = np.roll(stored_b,-end,axis=0)[:] 

807 # buffer: ZZZZYYY, add: XXX 

808 kwargs[name] = copy_ext[:add_N] 

809  

810 cpdef void clear(self): 

811 """Clear the bufer. 

812 """ 

813 self.stored_size = 0 

814  

815 cpdef on_episode_end(self): 

816 """Terminate episode. 

817 """ 

818 kwargs = {k: v[:self.stored_size].copy() for k, v in self.buffer.items()} 

819 done = kwargs["done"] 

820  

821 for i in range(1,self.stored_size): 

822 done[:-i] += kwargs["done"][i:] 

823  

824 self.clear() 

825 return kwargs 

826  

827 cpdef size_t get_Nstep_size(self): 

828 """Get Nstep size 

829  

830 Returns 

831 ------- 

832 Nstep_size : size_t 

833 Nstep size 

834 """ 

835 return self.Nstep_size 

836  

837  

838cdef class RingBufferIndex: 

839 """Ring Buffer Index class 

840 """ 

841 cdef index 

842 cdef buffer_size 

843 cdef is_full 

844  

845 def __init__(self, buffer_size, ctx = None, backend = "sharedctypes"): 

846 ctx = ctx or mp.get_context() 

847 self.index = RawValue(ctx, ctypes.c_size_t, 0, backend) 

848 self.buffer_size = RawValue(ctx, ctypes.c_size_t, buffer_size, backend) 

849 self.is_full = RawValue(ctx, ctypes.c_int, 0, backend) 

850  

851 cdef size_t get_next_index(self): 

852 return self.index.value 

853  

854 cdef size_t fetch_add(self,size_t N): 

855 """ 

856 Add then return original value 

857  

858 Parameters 

859 ---------- 

860 N : size_t 

861 value to add 

862  

863 Returns 

864 ------- 

865 size_t 

866 index before add 

867 """ 

868 cdef size_t ret = self.index.value 

869 self.index.value += N 

870  

871 if self.index.value >= self.buffer_size.value: 

872 self.is_full.value = 1 

873  

874 while self.index.value >= self.buffer_size.value: 

875 self.index.value -= self.buffer_size.value 

876  

877 return ret 

878  

879 cdef void clear(self): 

880 self.index.value = 0 

881 self.is_full.value = 0 

882  

883 cdef size_t get_stored_size(self): 

884 if self.is_full.value: 

885 return self.buffer_size.value 

886 else: 

887 return self.index.value 

888  

889  

890cdef class ProcessSafeRingBufferIndex(RingBufferIndex): 

891 """Process Safe Ring Buffer Index class 

892 """ 

893 cdef lock 

894  

895 def __init__(self, buffer_size, ctx=None, backend="sharedctypes"): 

896 ctx = ctx or mp.get_context() 

897 super().__init__(buffer_size, ctx, backend) 

898 self.lock = ctx.Lock() 

899  

900 cdef size_t get_next_index(self): 

901 with self.lock: 

902 return RingBufferIndex.get_next_index(self) 

903  

904 cdef size_t fetch_add(self,size_t N): 

905 with self.lock: 

906 return RingBufferIndex.fetch_add(self,N) 

907  

908 cdef void clear(self): 

909 with self.lock: 

910 RingBufferIndex.clear(self) 

911  

912 cdef size_t get_stored_size(self): 

913 with self.lock: 

914 return RingBufferIndex.get_stored_size(self) 

915  

916  

917@cython.embedsignature(True) 

918cdef class ReplayBuffer: 

919 r"""Replay Buffer class to store transitions and to sample them randomly. 

920  

921 The transition can contain anything compatible with NumPy data 

922 type. User can specify by ``env_dict`` parameters at constructor 

923 freely. 

924  

925 The possible standard transition contains observation (``obs``), 

926 action (``act``), reward (``rew``), the next observation 

927 (``next_obs``), and done (``done``). 

928  

929 >>> env_dict = {"obs": {"shape": (4,4)}, 

930 ... "act": {"shape": 3, "dtype": np.int16}, 

931 ... "rew": {}, 

932 ... "next_obs": {"shape": (4,4)}, 

933 ... "done": {}} 

934  

935 In this class, sampling is random sampling and the same transition 

936 can be chosen multiple times.""" 

937 cdef buffer 

938 cdef size_t buffer_size 

939 cdef env_dict 

940 cdef RingBufferIndex index 

941 cdef size_t episode_len 

942 cdef next_of 

943 cdef bool has_next_of 

944 cdef next_ 

945 cdef bool compress_any 

946 cdef stack_compress 

947 cdef cache 

948 cdef default_dtype 

949 cdef StepChecker size_check 

950 cdef NstepBuffer nstep 

951 cdef bool use_nstep 

952 cdef size_t cache_size 

953  

954 def __cinit__(self,size,env_dict=None,*, 

955 next_of=None,stack_compress=None,default_dtype=None,Nstep=None, 

956 mmap_prefix =None, 

957 **kwargs): 

958 self.env_dict = env_dict.copy() if env_dict else {} 

959 cdef special_keys = [] 

960  

961 self.buffer_size = size 

962 self.index = RingBufferIndex(self.buffer_size) 

963 self.episode_len = 0 

964  

965 self.compress_any = stack_compress 

966 self.stack_compress = np.array(stack_compress,ndmin=1,copy=False) 

967  

968 self.default_dtype = default_dtype or np.single 

969  

970 self.has_next_of = next_of 

971 self.next_of = np.array(next_of, 

972 ndmin=1,copy=False) if self.has_next_of else None 

973 self.next_ = {} 

974 self.cache = {} if (self.has_next_of or self.compress_any) else None 

975  

976 self.use_nstep = Nstep 

977 if self.use_nstep: 

978 self.nstep = NstepBuffer(self.env_dict,Nstep.copy(), 

979 stack_compress = self.stack_compress, 

980 next_of = self.next_of, 

981 default_dtype = self.default_dtype) 

982  

983 # Nstep is not support next_of yet 

984 self.next_of = None 

985 self.has_next_of = False 

986  

987 # side effect: Add "add_shape" key into self.env_dict 

988 self.buffer = dict2buffer(self.buffer_size,self.env_dict, 

989 stack_compress = self.stack_compress, 

990 default_dtype = self.default_dtype, 

991 mmap_prefix = mmap_prefix) 

992  

993 self.size_check = StepChecker(self.env_dict,special_keys) 

994  

995 # Cache Size: 

996 # No "next_of" nor "stack_compress": -> 0 

997 # If "stack_compress": -> max of stack size -1 

998 # If "next_of": -> Increase by 1 

999 self.cache_size = 1 if (self.cache is not None) else 0 

1000 if self.compress_any: 

1001 for name in self.stack_compress: 

1002 self.cache_size = max(self.cache_size, 

1003 np.array(self.env_dict[name]["shape"], 

1004 ndmin=1,copy=False)[-1] -1) 

1005  

1006 if self.has_next_of: 

1007 self.cache_size += 1 

1008 for name in self.next_of: 

1009 self.next_[name] = self.buffer[name][0].copy() 

1010  

1011 def __init__(self,size,env_dict=None,*, 

1012 next_of=None,stack_compress=None,default_dtype=None,Nstep=None, 

1013 mmap_prefix =None, 

1014 **kwargs): 

1015 r"""Initialize ``ReplayBuffer`` 

1016  

1017 Parameters 

1018 ---------- 

1019 size : int 

1020 Buffer size 

1021 env_dict : dict of dict, optional 

1022 Dictionary specifying environments. The keys of ``env_dict`` become 

1023 environment names. The values of ``env_dict``, which are also ``dict``, 

1024 defines ``"shape"`` (default ``1``) and ``"dtypes"`` (fallback to 

1025 ``default_dtype``) 

1026 next_of : str or array like of str, optional 

1027 Value names whose next items share memory region. 

1028 The ``"next_"`` prefixed items (eg. ``next_obs`` for ``obs``) are 

1029 automatically added to ``env_dict`` without duplicated memory. 

1030 stack_compress : str or array like of str, optional 

1031 Value names whose duplicated stack dimension is compressed. 

1032 The values must have stacked dimension at the last dimension. 

1033 default_dtype : numpy.dtype, optional 

1034 Fallback dtype. The default value is ``numpy.single`` 

1035 Nstep : dict, optional 

1036 If this option is specified, Nstep reward is used. 

1037 ``Nstep["size"]`` is ``int`` specifying step size of Nstep reward. 

1038 ``Nstep["rew"]`` is ``str`` or array like of ``str`` specifying 

1039 Nstep reward to be summed. ``Nstep["gamma"]`` is float specifying 

1040 discount factor, its default is ``0.99``. ``Nstep["next"]`` is ``str`` or 

1041 list of ``str`` specifying next values to be moved. 

1042 When this option is enabled, ``"done"`` is required at ``env_dict``. 

1043 mmap_prefix : str, optional 

1044 File name prefix to map buffer data using mmap. If ``None`` (default), 

1045 stores only on memory. This feature is designed for very large data 

1046 which cannot be located on physical memory. 

1047  

1048  

1049 Examples 

1050 -------- 

1051 Create simple replay buffer with buffer size of :math:`10^6`. 

1052  

1053 >>> rb = ReplayBuffer(1e+6, 

1054 ... {"obs": {"shape": 3}, "act": {}, "rew": {}, 

1055 ... "next_obs": {"shape": 3}, "done": {}}) 

1056  

1057 Create replay buffer with ``np.float64``, but only ``"act"`` is ``np.int8``. 

1058  

1059 >>> rb = ReplayBuffer(1e+6, 

1060 ... {"obs": {"shape": 3}, "act": {"dtype": np.int8}, 

1061 ... "rew": {}, 

1062 ... "next_obs": {"shape": 3}, "done": {}}, 

1063 ... default_dtype = np.float64) 

1064  

1065 Create replay buffer with ``next_of`` memory compression for ``"obs"``. 

1066 In this example, ``"next_obs"`` is automatically added and shares the memory 

1067 with ``"obs"``. 

1068  

1069 >>> rb = ReplayBuffer(1e+6, 

1070 ... {"obs": {"shape": 3}, "act": {}, "rew": {}, "done": {}}, 

1071 ... next_of="obs") 

1072  

1073 Create replay buffer with ``stack_compress`` memory compression for ``"obs"``. 

1074 The stacked data must be a sliding window of a sequential data, and the last 

1075 dimension is the stack dimension. 

1076  

1077 >>> rb = ReplayBuffer(1e+6, 

1078 ... {"obs": {"shape": (3,2)}}, 

1079 ... stack_compress="obs") 

1080 >>> rb.add(obs=[[1,2], 

1081 ... [1,2], 

1082 ... [1,2]]) 

1083 0 

1084 >>> rb.add(obs=[[2,3], 

1085 ... [2,3], 

1086 ... [2,3]]) 

1087 1 

1088  

1089 Create very large replay buffer mapping on disk. 

1090  

1091 >>> rb = ReplayBuffer(1e+9, {"obs": "shape": 3}, mmap_prefix="rb_data") 

1092 """ 

1093 pass 

1094  

1095 def add(self,*,**kwargs): 

1096 r"""Add transition(s) into replay buffer. 

1097  

1098 Multple sets of transitions can be added simultaneously. 

1099  

1100 Parameters 

1101 ---------- 

1102 **kwargs : array like or float or int 

1103 Transitions to be stored. 

1104  

1105 Returns 

1106 ------- 

1107 int or None 

1108 The first index of stored position. If all transitions are stored 

1109 into ``NstepBuffer`` and no transtions are stored into the main buffer, 

1110 ``None`` is returned. 

1111  

1112 Raises 

1113 ------ 

1114 KeyError 

1115 If any values defined at constructor are missing. 

1116  

1117 Warnings 

1118 -------- 

1119 All values must be passed by key-value style (keyword arguments). 

1120 It is user responsibility that all the values have the same step-size. 

1121  

1122  

1123 Examples 

1124 -------- 

1125 >>> rb = ReplayBuffer(1e+6, {"obs": {"shape": 3}}) 

1126  

1127 Add a single transition: ``[1,2,3]``. 

1128  

1129 >>> rb.add(obs=[1,2,3]) 

1130  

1131 Add three step sequential transitions: ``[1,2,3]``, ``[4,5,6]``, 

1132 and ``[7,8,9]`` simultaneously. 

1133  

1134 >>> rb.add(obs=[[1,2,3], 

1135 ... [4,5,6], 

1136 ... [7,8,9]]) 

1137 """ 

1138 if self.use_nstep: 

1139 kwargs = self.nstep.add(**kwargs) 

1140 if kwargs is None: 

1141 return 

1142  

1143 cdef size_t N = self.size_check.step_size(kwargs) 

1144  

1145 cdef size_t index = self.index.fetch_add(N) 

1146 cdef size_t end = index + N 

1147 cdef size_t remain = 0 

1148 cdef add_idx = np.arange(index,end) 

1149 cdef size_t key_min = 0 

1150  

1151 if end > self.buffer_size: 

1152 remain = end - self.buffer_size 

1153 add_idx[add_idx >= self.buffer_size] -= self.buffer_size 

1154  

1155 if self.cache is not None: 

1156 for _i in add_idx: 

1157 self.cache.pop(_i, None) 

1158  

1159 if self.compress_any and (remain or 

1160 self.get_stored_size() == self.buffer_size): 

1161 key_min = remain or end 

1162 for key in range(key_min, 

1163 min(key_min + self.cache_size, self.buffer_size)): 

1164 self.add_cache_i(key, index) 

1165  

1166 for name, b in self.buffer.items(): 

1167 b[add_idx] = np.reshape(np.array(kwargs[name],copy=False,ndmin=2), 

1168 self.env_dict[name]["add_shape"]) 

1169  

1170 if self.has_next_of: 

1171 for name in self.next_of: 

1172 self.next_[name][...]=np.reshape(np.array(kwargs[f"next_{name}"], 

1173 copy=False, 

1174 ndmin=2), 

1175 self.env_dict[name]["add_shape"])[-1] 

1176  

1177 self.episode_len += N 

1178 return index 

1179  

1180 def get_all_transitions(self,shuffle: bool=False): 

1181 r""" 

1182 Get all transitions stored in replay buffer. 

1183  

1184 Parameters 

1185 ---------- 

1186 shuffle : bool, optional 

1187 When ``True``, transitions are shuffled. The default value is ``False``. 

1188  

1189 Returns 

1190 ------- 

1191 transitions : dict of numpy.ndarray 

1192 All transitions stored in this replay buffer. 

1193 """ 

1194 idx = np.arange(self.get_stored_size()) 

1195  

1196 if shuffle: 

1197 np.random.shuffle(idx) 

1198  

1199 return self._encode_sample(idx) 

1200  

1201 def save_transitions(self, file, *, safe=True): 

1202 r""" 

1203 Save transitions to file 

1204  

1205 Parameters 

1206 ---------- 

1207 file : str or file-like object 

1208 File to write data 

1209 safe : bool, optional 

1210 If ``False``, we try more aggressive compression 

1211 which might encounter future incompatibility. 

1212 """ 

1213 FORMAT_VERSION = 1 

1214 if (safe or not (self.compress_any or self.has_next_of)): 

1215 data = {"safe": True, 

1216 "version": FORMAT_VERSION, 

1217 "data": self.get_all_transitions(), 

1218 "Nstep": self.is_Nstep(), 

1219 "cache": None, 

1220 "next_of": None} 

1221 else: 

1222 self.add_cache() 

1223 N = self.get_stored_size() 

1224 if N == self.get_buffer_size(): 

1225 b = self.buffer 

1226 else: 

1227 b = {k: v[:N] for k, v in self.buffer.items()} 

1228  

1229 data = {"safe": False, 

1230 "version": FORMAT_VERSION, 

1231 "data": b, 

1232 "Nstep": self.is_Nstep(), 

1233 "cache": self.cache, 

1234 "next_of": self.next_of} 

1235 np.savez_compressed(file, **data) 

1236  

1237 def _load_transitions_v1(self, data): 

1238 d = unwrap(data["data"]) 

1239 N = data["Nstep"] 

1240  

1241 if data["safe"]: 

1242 if N: 

1243 self.use_nstep = False 

1244 self.add(**d) 

1245 self.use_nstep = True 

1246 else: 

1247 self.add(**d) 

1248 return None 

1249  

1250 c = unwrap(data["cache"]) 

1251 n = unwrap(data["next_of"]) 

1252  

1253 cache_idx = np.sort([i for i in c.keys()]) 

1254  

1255 for k, v in d.items(): 

1256 _size = v.shape[0] 

1257 break 

1258  

1259 if N: 

1260 self.use_nstep = False 

1261  

1262 idx = 0 

1263 for i in cache_idx: 

1264 if idx < i: 

1265 merge = {k: v[idx:i] for k,v in d.items()} 

1266 if n is not None: 

1267 merge = {**merge, **{f"next_{k}": d[k][idx+1:i+1] for k in n}} 

1268 self.add(**merge) 

1269 merge = {**{k: v[i] for k,v in d.items()}, **c[i]} 

1270 self.add(**merge) 

1271 self.on_episode_end() 

1272 idx = i+1 

1273  

1274 if idx < _size: 

1275 if idx < _size - 1: 

1276 merge = {k: v[idx:_size-1] for k,v in d.items()} 

1277 if n is not None: 

1278 merge = {**merge, **{f"next_{k}": d[k][idx+1:_size] for k in n}} 

1279 self.add(**merge) 

1280  

1281 merge = {k: v[_size-1] for k,v in d.items()} 

1282 if n is not None: 

1283 merge = {**merge, **{f"next_{k}": d[k][0] for k in n}} 

1284 self.add(**merge) 

1285  

1286 if N: 

1287 self.use_nstep = True 

1288  

1289 def load_transitions(self, file): 

1290 r""" 

1291 Load transitions from file 

1292  

1293 Parameters 

1294 ---------- 

1295 file : str or file-like object 

1296 File to read data 

1297  

1298 Raises 

1299 ------ 

1300 ValueError 

1301 When file format is wrong. 

1302  

1303 Warnings 

1304 -------- 

1305 In order to avoid security vulnerability, 

1306 you **must not** load untrusted file, since this method is 

1307 based on ``pickle``. 

1308 """ 

1309 with np.load(file, allow_pickle=True) as data: 

1310 version = data["version"] 

1311 N = data["Nstep"] 

1312  

1313 if (N and not self.is_Nstep()) or (not N and self.is_Nstep()): 

1314 raise ValueError(f"Stored data and Buffer mismatch for Nstep") 

1315  

1316 if version == 1: 

1317 self._load_transitions_v1(data) 

1318 else: 

1319 raise ValueError(f"Unknown Format Version: {version}") 

1320  

1321 def _encode_sample(self,idx): 

1322 cdef sample = {} 

1323 cdef next_idx 

1324 cdef cache_idx 

1325 cdef bool use_cache 

1326  

1327 idx = np.array(idx,copy=False,ndmin=1) 

1328 for name, b in self.buffer.items(): 

1329 sample[name] = b[idx] 

1330  

1331 if self.has_next_of: 

1332 next_idx = idx + 1 

1333 next_idx[next_idx == self.get_buffer_size()] = 0 

1334 cache_idx = (next_idx == self.get_next_index()) 

1335 use_cache = cache_idx.any() 

1336  

1337 for name in self.next_of: 

1338 sample[f"next_{name}"] = self.buffer[name][next_idx] 

1339 if use_cache: 

1340 # Cache for the latest "next_***" stored at `self.next_` 

1341 sample[f"next_{name}"][cache_idx] = self.next_[name] 

1342  

1343 cdef size_t i,_i 

1344 cdef size_t N = idx.shape[0] 

1345 if self.cache is not None: 

1346 # Cache for episode ends stored at `self.cache` 

1347 for _i in range(N): 

1348 i = idx[_i] 

1349 if i in self.cache: 

1350 if self.has_next_of: 

1351 for name in self.next_of: 

1352 sample[f"next_{name}"][_i] = self.cache[i][f"next_{name}"] 

1353 if self.compress_any: 

1354 for name in self.stack_compress: 

1355 sample[name][_i] = self.cache[i][name] 

1356  

1357 return sample 

1358  

1359 def sample(self,batch_size): 

1360 r"""Sample the stored transitions randomly with specified size 

1361  

1362 Parameters 

1363 ---------- 

1364 batch_size : int 

1365 sampled batch size 

1366  

1367 Returns 

1368 ------- 

1369 sample : dict of ndarray 

1370 Sampled batch transitions, which might contains 

1371 the same transition multiple times. 

1372  

1373 Examples 

1374 -------- 

1375 >>> rb = ReplayBuffer(1e+6, {"obs": {"shape": 3}}) 

1376 >>> rb.add(obs=[1,2,3]) 

1377 >>> rb.add(obs=[[1,2,3],[1,2,3]]) 

1378 >>> rb.sample(4) 

1379 {'obs': array([[1., 2., 3.], 

1380 [1., 2., 3.], 

1381 [1., 2., 3.], 

1382 [1., 2., 3.]], dtype=float32)} 

1383 """ 

1384 cdef idx = np.random.randint(0,self.get_stored_size(),batch_size) 

1385 return self._encode_sample(idx) 

1386  

1387 cpdef void clear(self) except *: 

1388 r"""Clear replay buffer. 

1389  

1390 Set ``index`` and ``stored_size`` to ``0``. 

1391  

1392 Example 

1393 ------- 

1394 >>> rb = ReplayBuffer(5,{"done",{}}) 

1395 >>> rb.add(1) 

1396 >>> rb.get_stored_size() 

1397 1 

1398 >>> rb.get_next_index() 

1399 1 

1400 >>> rb.clear() 

1401 >>> rb.get_stored_size() 

1402 0 

1403 >>> rb.get_next_index() 

1404 0 

1405 """ 

1406 self.index.clear() 

1407 self.episode_len = 0 

1408  

1409 self.cache = {} if (self.has_next_of or self.compress_any) else None 

1410  

1411 if self.use_nstep: 

1412 self.nstep.clear() 

1413  

1414 cpdef size_t get_stored_size(self): 

1415 r"""Get stored size 

1416  

1417 Returns 

1418 ------- 

1419 size_t 

1420 stored size 

1421 """ 

1422 return self.index.get_stored_size() 

1423  

1424 cpdef size_t get_buffer_size(self): 

1425 r"""Get buffer size 

1426  

1427 Returns 

1428 ------- 

1429 size_t 

1430 buffer size 

1431 """ 

1432 return self.buffer_size 

1433  

1434 cpdef size_t get_next_index(self): 

1435 r"""Get the next index to store 

1436  

1437 Returns 

1438 ------- 

1439 size_t 

1440 the next index to store 

1441 """ 

1442 return self.index.get_next_index() 

1443  

1444 cdef void add_cache(self): 

1445 r"""Add last items into cache 

1446  

1447 The last items for ``next_of`` and ``stack_compress`` optimization 

1448 are moved to cache area. 

1449  

1450 If ``self.cache is None`, do nothing. 

1451 If ``self.stored_size == 0``, do nothing. 

1452 """ 

1453  

1454 # If no cache configuration, do nothing 

1455 if self.cache is None: 

1456 return 

1457  

1458 # If nothing are stored, do nothing 

1459 if self.get_stored_size() == 0: 

1460 return 

1461  

1462 cdef size_t key_end = (self.get_next_index() or self.buffer_size) 

1463 # Next index (without wraparounding): key_end in [1,...,self.buffer_size] 

1464  

1465 cdef size_t key_min = 0 

1466 cdef size_t max_cache = min(self.cache_size,self.episode_len) 

1467 if key_end > max_cache: 

1468 key_min = key_end - max_cache 

1469  

1470 cdef size_t key = 0 

1471 cdef size_t next_key = 0 

1472 for key in range(key_min, key_end): # key_end is excluded 

1473 self.add_cache_i(key, key_end) 

1474  

1475 cdef void add_cache_i(self, size_t key, size_t key_end): 

1476 # If key is already cached, don't do anything 

1477 if key in self.cache: 

1478 return 

1479  

1480 cdef size_t next_key = key + 1 

1481 cdef cache_key = {} 

1482  

1483 if self.has_next_of: 

1484 if next_key == key_end: 

1485 for name, value in self.next_.items(): 

1486 cache_key[f"next_{name}"] = value.copy() 

1487 else: 

1488 for name in self.next_.keys(): 

1489 cache_key[f"next_{name}"] = self.buffer[name][next_key].copy() 

1490  

1491 if self.compress_any: 

1492 for name in self.stack_compress: 

1493 cache_key[name] = self.buffer[name][key].copy() 

1494  

1495 self.cache[key] = cache_key 

1496  

1497 cpdef void on_episode_end(self) except *: 

1498 r"""Call on episode end 

1499  

1500 Finalize the current episode by moving remaining Nstep buffer transitions, 

1501 evacuating overlapped data for memory compression features, and resetting 

1502 episode length. 

1503  

1504 Notes 

1505 ----- 

1506 Calling this function at episode end is the user responsibility, 

1507 since episode exploration can be terminated at certain length 

1508 even though any ``done`` flags from environment is not set. 

1509 """ 

1510 if self.use_nstep: 

1511 self.use_nstep = False 

1512 self.add(**self.nstep.on_episode_end()) 

1513 self.use_nstep = True 

1514  

1515 self.add_cache() 

1516  

1517 self.episode_len = 0 

1518  

1519 cpdef size_t get_current_episode_len(self): 

1520 r"""Get current episode length 

1521  

1522 Returns 

1523 ------- 

1524 size_t 

1525 Current episode length 

1526 """ 

1527 return self.episode_len 

1528  

1529 cpdef bool is_Nstep(self): 

1530 r"""Get whether use Nstep or not 

1531  

1532 Returns 

1533 ------- 

1534 bool 

1535 Whether Nstep is used 

1536 """ 

1537 return self.use_nstep 

1538  

1539@cython.embedsignature(True) 

1540cdef class PrioritizedReplayBuffer(ReplayBuffer): 

1541 r"""Prioritized Replay Buffer class to store transitions with priorities. 

1542  

1543 In this class, these transitions are sampled with corresponding to priorities. 

1544  

1545 Notes 

1546 ----- 

1547 In Prioritized Experience Replay (PER) [1]_, transitions are sampled 

1548 with probabilities calculated from TD error. This class implements 

1549 propotional variant where :math:`p_i = (|TD|_i + \varepsilon)^{\alpha}`. 

1550  

1551 References 

1552 ---------- 

1553 .. [1] T. Schaul et al, "Prioritized Experience Replay", ICLR (2016), 

1554 https://arxiv.org/abs/1511.05952 

1555 """ 

1556 cdef VectorFloat weights 

1557 cdef VectorSize_t indexes 

1558 cdef float alpha 

1559 cdef CppPrioritizedSampler[float]* per 

1560 cdef NstepBuffer priorities_nstep 

1561 cdef bool check_for_update 

1562 cdef bool [:] unchange_since_sample 

1563 cdef vector[size_t] idx_vec 

1564 cdef vector[float] ps_vec 

1565  

1566 def __cinit__(self,size,env_dict=None,*,alpha=0.6,Nstep=None,eps=1e-4, 

1567 check_for_update=False,**kwrags): 

1568 self.alpha = alpha 

1569 self.per = new CppPrioritizedSampler[float](size,alpha) 

1570 self.per.set_eps(eps) 

1571 self.weights = VectorFloat() 

1572 self.indexes = VectorSize_t() 

1573  

1574 if self.use_nstep: 

1575 self.priorities_nstep = NstepBuffer({"priorities": {"dtype": np.single}, 

1576 "done": {}}, 

1577 {"size": Nstep["size"]}) 

1578  

1579 self.check_for_update = check_for_update 

1580 if self.check_for_update: 

1581 self.unchange_since_sample = np.ones(np.array(size, 

1582 copy=False, 

1583 dtype='int'), 

1584 dtype='bool') 

1585  

1586 self.idx_vec = vector[size_t]() 

1587 self.ps_vec = vector[float]() 

1588  

1589 def __init__(self,size,env_dict=None,*,alpha=0.6,Nstep=None,eps=1e-4, 

1590 check_for_update=False,**kwargs): 

1591 r"""Initialize ``PrioritizedReplayBuffer`` 

1592  

1593 Parameters 

1594 ---------- 

1595 size : int 

1596 Buffer size 

1597 env_dict : dict of dict, optional 

1598 Dictionary specifying environments. The keys of ``env_dict`` become 

1599 environment names. The values of ``env_dict``, which are also ``dict``, 

1600 defines ``"shape"`` (default ``1``) and ``"dtypes"`` 

1601 (fallback to ``default_dtype``) 

1602 alpha : float, optional 

1603 :math:`\alpha` the exponent of the priorities in stored whose 

1604 default value is ``0.6``. 

1605 eps : float, optional 

1606 :math:`\epsilon` small positive constant to ensure error-less state 

1607 will be sampled, whose default value is ``1e-4``. 

1608 check_for_update : bool 

1609 If the value is ``True`` (default value is ``False``), 

1610 this buffer traces updated indices after the last calling of 

1611 ``sample()`` method to avoid mis-updating priorities of already 

1612 overwritten values. This feature is designed for multiprocess learning. 

1613  

1614 See Also 

1615 -------- 

1616 ReplayBuffer : Any optional parameters at ReplayBuffer are valid, too. 

1617  

1618  

1619 Notes 

1620 ----- 

1621 The minimum and summation over certain ranges of pre-calculated priorities 

1622 :math:`(p_{i} + \epsilon )^{ \alpha }` are stored with segment tree, which 

1623 enable fast sampling. 

1624 """ 

1625 pass 

1626  

1627 def add(self,*,priorities = None,**kwargs): 

1628 r"""Add transition(s) into replay buffer. 

1629  

1630 Multple sets of transitions can be added simultaneously. 

1631  

1632 Parameters 

1633 ---------- 

1634 priorities : array like or float, optional 

1635 Priorities of each environment. When no priorities are passed, 

1636 the maximum priorities until then are used. 

1637 **kwargs : array like or float or int 

1638 Transitions to be stored. 

1639  

1640 Returns 

1641 ------- 

1642 int or None 

1643 The first index of stored position. If all transitions are stored 

1644 into ``NstepBuffer`` and no transtions are stored into the main buffer, 

1645 ``None`` is returned. 

1646  

1647 Raises 

1648 ------ 

1649 KeyError 

1650 If any values defined at constructor are missing. 

1651  

1652 Warnings 

1653 -------- 

1654 All values must be passed by key-value style (keyword arguments). 

1655 It is user responsibility that all the values have the same step-size. 

1656 """ 

1657 cdef size_t N = self.size_check.step_size(kwargs) 

1658 if priorities is not None: 

1659 priorities = np.ravel(np.array(priorities,copy=False, 

1660 ndmin=1,dtype=np.single)) 

1661 if N != priorities.shape[0]: 

1662 raise ValueError("`priorities` shape is incompatible") 

1663  

1664 if self.use_nstep: 

1665 if priorities is None: 

1666 priorities = np.full((N),self.get_max_priority(),dtype=np.single) 

1667  

1668 priorities = self.priorities_nstep.add(priorities=priorities, 

1669 done=np.array(kwargs["done"], 

1670 copy=True)) 

1671 if priorities is not None: 

1672 priorities = np.ravel(priorities["priorities"]) 

1673 N = priorities.shape[0] 

1674  

1675 cdef maybe_index = super().add(**kwargs) 

1676 if maybe_index is None: 

1677 return None 

1678  

1679 cdef size_t index = maybe_index 

1680 cdef const float [:] ps 

1681  

1682 if priorities is not None: 

1683 ps = np.ravel(np.array(priorities,copy=False,ndmin=1,dtype=np.single)) 

1684 self.per.set_priorities(index,&ps[0],N,self.get_buffer_size()) 

1685 else: 

1686 self.per.set_priorities(index,N,self.get_buffer_size()) 

1687  

1688 if self.check_for_update: 

1689 if index+N <= self.buffer_size: 

1690 self.unchange_since_sample[index:index+N] = False 

1691 else: 

1692 self.unchange_since_sample[index:] = False 

1693 self.unchange_since_sample[:index+N-self.buffer_size] = False 

1694  

1695 return index 

1696  

1697 def sample(self,batch_size,beta = 0.4): 

1698 r"""Sample the stored transitions. 

1699  

1700 Transitions are sampled depending on correspoinding priorities 

1701 with specified size 

1702  

1703 Parameters 

1704 ---------- 

1705 batch_size : int 

1706 Sampled batch size 

1707 beta : float, optional 

1708 The exponent of weight for relaxation of importance 

1709 sampling effect, whose default value is ``0.4`` 

1710  

1711 Returns 

1712 ------- 

1713 dict of ndarray 

1714 Sampled batch transitions which also includes 

1715 ``"weights"`` and ``"indexes"`` 

1716  

1717 Notes 

1718 ----- 

1719 When ``beta`` is ``0``, weights become uniform. When ``beta`` is ``1``, 

1720 weight becomes usual importance sampling. 

1721 The ``weights`` are also normalized by the weight for minimum priority 

1722 (:math:`= w_{i}/\max_{j}(w_{j})`), which ensures the weights :math:`\leq` 1. 

1723 """ 

1724 self.per.sample(batch_size,beta, 

1725 self.weights.vec,self.indexes.vec, 

1726 self.get_stored_size()) 

1727 cdef idx = self.indexes.as_numpy() 

1728 samples = self._encode_sample(idx) 

1729 samples['weights'] = self.weights.as_numpy() 

1730 samples['indexes'] = idx 

1731  

1732 if self.check_for_update: 

1733 self.unchange_since_sample[:] = True 

1734  

1735 return samples 

1736  

1737 def update_priorities(self,indexes,priorities): 

1738 r"""Update priorities 

1739  

1740 Update priorities specified with indicies. If this 

1741 ``PrioritizedReplayBuffer`` is constructed with 

1742 ``check_for_update=True``, then ignore indices which updated 

1743 values after the last calling of ``sample()`` method. 

1744  

1745 Parameters 

1746 ---------- 

1747 indexes : array_like 

1748 Indexes to update priorities 

1749 priorities : array_like 

1750 Priorities to update 

1751  

1752 Raises 

1753 ------ 

1754 TypeError 

1755 When ``indexes`` or ``priorities`` are ``None`` 

1756 """ 

1757  

1758 if priorities is None: 

1759 raise TypeError("``properties`` must not be ``None``") 

1760  

1761 cdef const size_t [:] idx = Csize(indexes) 

1762 cdef const float [:] ps = Cfloat(priorities) 

1763  

1764 if not self.check_for_update: 

1765 self.per.update_priorities(&idx[0],&ps[0],idx.shape[0]) 

1766 return None 

1767  

1768 self.idx_vec.clear() 

1769 self.idx_vec.reserve(idx.shape[0]) 

1770  

1771 self.ps_vec.clear() 

1772 self.ps_vec.reserve(ps.shape[0]) 

1773  

1774 if self.check_for_update: 

1775 for _i in range(idx.shape[0]): 

1776 if self.unchange_since_sample[idx[_i]]: 

1777 self.idx_vec.push_back(idx[_i]) 

1778 self.ps_vec.push_back(ps[_i]) 

1779  

1780 cdef N = self.idx_vec.size() 

1781 if N > 0: 

1782 self.per.update_priorities(self.idx_vec.data(),self.ps_vec.data(),N) 

1783  

1784 cpdef void clear(self) except *: 

1785 r"""Clear replay buffer 

1786 """ 

1787 super(PrioritizedReplayBuffer,self).clear() 

1788 clear(self.per) 

1789 if self.use_nstep: 

1790 self.priorities_nstep.clear() 

1791  

1792 cpdef float get_max_priority(self): 

1793 r"""Get the max priority of stored priorities 

1794  

1795 Returns 

1796 ------- 

1797 max_priority : float 

1798 Max priority of stored priorities 

1799 """ 

1800 return self.per.get_max_priority() 

1801  

1802 cpdef void on_episode_end(self) except *: 

1803 r"""Call on episode end 

1804  

1805 Finalize the current episode by moving remaining Nstep buffer transitions, 

1806 evacuating overlapped data for memory compression features, and resetting 

1807 episode length. 

1808  

1809 Notes 

1810 ----- 

1811 Calling this function at episode end is the user responsibility, 

1812 since episode exploration can be terminated at certain length 

1813 even though any ``done`` flags from environment is not set. 

1814 """ 

1815 if self.use_nstep: 

1816 self.use_nstep = False 

1817 self.add(**self.nstep.on_episode_end(), 

1818 priorities=self.priorities_nstep.on_episode_end()["priorities"]) 

1819 self.use_nstep = True 

1820  

1821 self.add_cache() 

1822  

1823 self.episode_len = 0 

1824  

1825  

1826@cython.embedsignature(True) 

1827cdef class ReverseReplayBuffer(ReplayBuffer): 

1828 r"""Replay Buffer class for Reverse Experience Replay (RER) 

1829  

1830  

1831 Notes 

1832 ----- 

1833 In Reverse Experience Replay (RER) [1]_, samples equally strided 

1834 transitions reversely. 

1835  

1836 .. math:: 

1837 \begin{align} 

1838 \text{sample-1}: &T_t , &T_{t-stride} , &\dots, &T_{t-batch\_size\times stride}\\ 

1839 \text{sample-2}: &T_{t-1}, &T_{t-stride-1}, &\dots, &T_{t-batch\_size\times stride-1}\\ 

1840 \dots&&&& 

1841 \end{align} 

1842  

1843 When the first index ``t-i`` is delayed from the latest index more 

1844 than ``2*tride``, the first index will be reset to the latest one. 

1845  

1846  

1847 References 

1848 ---------- 

1849 .. [1] E. Rotinov, "Reverse Experience Replay" (2019), 

1850 https://arxiv.org/abs/1910.08780 

1851 """ 

1852 cdef size_t stride 

1853 cdef size_t last_sampled_index 

1854 def __cinit__(self, size, env_dict=None, *, stride = 300, **kwargs): 

1855 self.stride = stride 

1856 self.last_sampled_index = 0 

1857  

1858 def __init__(self, size, env_dict=None,*, stride = 300, **kwargs): 

1859 r""" 

1860 Initialize ReverseReplayBuffer 

1861  

1862 Parameters 

1863 ---------- 

1864 size : int 

1865 Buffer size 

1866 next_of : str or array like of str, optional 

1867 Value names whose next items share memory region. 

1868 The ``"next_"`` prefixed items (eg. ``next_obs`` for ``obs``) are 

1869 automatically added to ``env_dict`` without duplicated memory. 

1870 stack_compress : str or array like of str, optional 

1871 Value names whose duplicated stack dimension is compressed. 

1872 The values must have stacked dimension at the last dimension. 

1873 default_dtype : numpy.dtype, optional 

1874 fallback dtype for not specified in ``env_dict``. default is 

1875 ``numpy.single`` 

1876 Nstep : dict, optional 

1877 ``Nstep["size"]`` is ``int`` specifying step size of Nstep reward. 

1878 ``Nstep["rew"]`` is ``str`` or array like of ``str`` specifying 

1879 Nstep reward to be summed. `Nstep["gamma"]` is float specifying 

1880 discount factor, its default is ``0.99``. ``Nstep["next"]`` is ``str`` or 

1881 list of ``str`` specifying next values to be moved. 

1882 mmap_prefix : str, optional 

1883 File name prefix to save buffer data using mmap. If ``None`` (default), 

1884 save only on memory. 

1885 stride : int, optional 

1886 stride size. The default is ``300``. 

1887  

1888  

1889 See Also 

1890 -------- 

1891 ReplayBuffer : Any optional parameters at ReplayBuffer are valid, too. 

1892 """ 

1893 super().__init__(size, env_dict, **kwargs) 

1894  

1895 def sample(self, batch_size): 

1896 r"""Sample the stored transitions reversely 

1897  

1898 Parameters 

1899 ---------- 

1900 batch_size : int 

1901 sampled batch size 

1902  

1903 Returns 

1904 ------- 

1905 dict of ndarray 

1906 Sampled batch transitions, which might contains 

1907 the same transition multiple times. 

1908 """ 

1909 cdef size_t nidx = self.get_next_index() 

1910 cdef size_t ssize = self.get_stored_size() 

1911  

1912 cdef size_t tmp_nidx = nidx 

1913 if tmp_nidx <= self.last_sampled_index: 

1914 tmp_nidx += ssize 

1915  

1916 if (tmp_nidx - self.last_sampled_index) >= 2 * self.stride: 

1917 self.last_sampled_index = (nidx or ssize) - 1 

1918 else: 

1919 self.last_sampled_index = (self.last_sampled_index or ssize) - 1 

1920  

1921 cdef idx = np.zeros(batch_size, dtype = np.uint) 

1922  

1923 # Ensure (idx >= 0).all() 

1924 cdef size_t i 

1925 cdef size_t tmp = self.last_sampled_index 

1926 for i in range(batch_size): 

1927 idx[i] = tmp 

1928 while tmp < self.stride: 

1929 tmp += ssize 

1930 tmp -= self.stride 

1931  

1932 return self._encode_sample(idx) 

1933  

1934  

1935@cython.embedsignature(True) 

1936cdef class MPReplayBuffer: 

1937 r"""Multi-process support Replay Buffer class to store transitions and to sample them randomly. 

1938  

1939 This class works on multi-process without manual locking of entire buffer. 

1940  

1941 The transition can contain anything compatible with NumPy data 

1942 type. User can specify by ``env_dict`` parameters at constructor 

1943 freely. 

1944  

1945 The possible standard transition contains observation (``obs``), action (``act``), 

1946 reward (``rew``), the next observation (``next_obs``), and done (``done``). 

1947  

1948 >>> env_dict = {"obs": {"shape": (4,4)}, 

1949 ... "act": {"shape": 3, "dtype": np.int16}, 

1950 ... "rew": {}, 

1951 ... "next_obs": {"shape": (4,4)}, 

1952 ... "done": {}} 

1953  

1954 In this class, sampling is random sampling and the same transition 

1955 can be chosen multiple times. 

1956  

1957 See Also 

1958 -------- 

1959 ReplayBuffer : Single process version 

1960  

1961 Notes 

1962 ----- 

1963 This class assumes single learner (``sample``) and multiple explorers (``add``) 

1964 like Ape-X [1]_. 

1965  

1966 References 

1967 ---------- 

1968 .. [1] D. Horgan et al., "Distributed Prioritized Experience Replay", ICLR (2018), 

1969 https://openreview.net/forum?id=H1Dy---0Z 

1970 https://arxiv.org/abs/1803.00933 

1971 """ 

1972 cdef buffer 

1973 cdef size_t buffer_size 

1974 cdef env_dict 

1975 cdef ProcessSafeRingBufferIndex index 

1976 cdef default_dtype 

1977 cdef StepChecker size_check 

1978 cdef explorer_ready 

1979 cdef explorer_count 

1980 cdef explorer_count_lock 

1981 cdef learner_ready 

1982 cdef backend 

1983  

1984 def __init__(self, size, env_dict=None, *, 

1985 default_dtype=None, logger=None, 

1986 ctx=None, backend="sharedctypes", 

1987 **kwargs): 

1988 r"""Initialize ``MPReplayBuffer`` 

1989  

1990 Parameters 

1991 ---------- 

1992 size : int 

1993 Buffer size 

1994 env_dict : dict of dict, optional 

1995 Dictionary specifying environments. The keys of ``env_dict`` become 

1996 environment names. The values of ``env_dict``, which are also ``dict``, 

1997 defines ``"shape"`` (default ``1``) and ``"dtypes"`` (fallback to 

1998 ``default_dtype``) 

1999 default_dtype : numpy.dtype, optional 

2000 Fallback dtype for not specified in ``env_dict``. 

2001 default is ``numpy.single`` 

2002 ctx : ForkContext, SpawnContext, or SyncManager, optional 

2003 Context created by ``multiprocessing.get_context()`` or ``SyncManager``. 

2004 If ``None`` (default), the default context is used. 

2005 backend : {"sharedctypes", "SharedMemory"} 

2006 Shared memory (shm) backend to map buffer. The default is 

2007 ``"sharedctypes"``. ``"SharedMemory"`` is available only for Python 3.8+. 

2008 """ 

2009 self.env_dict = env_dict.copy() if env_dict else {} 

2010 ctx = ctx or mp.get_context() 

2011 try_start(ctx) 

2012  

2013 if not _has_SharedMemory and backend == "SharedMemory": 

2014 backend = "sharedctypes" 

2015 self.backend = backend 

2016  

2017 cdef special_keys = [] 

2018  

2019 self.buffer_size = size 

2020 self.index = ProcessSafeRingBufferIndex(self.buffer_size, ctx, 

2021 self.backend) 

2022  

2023 self.default_dtype = default_dtype or np.single 

2024  

2025 # side effect: Add "add_shape" key into self.env_dict 

2026 self.buffer = dict2buffer(self.buffer_size,self.env_dict, 

2027 default_dtype = self.default_dtype, 

2028 shared = self.backend, 

2029 ctx = ctx) 

2030  

2031 self.size_check = StepChecker(self.env_dict,special_keys) 

2032  

2033 self.learner_ready = ctx.Event() 

2034 self.learner_ready.clear() 

2035 self.explorer_ready = ctx.Event() 

2036 self.explorer_ready.set() 

2037 self.explorer_count = RawValue(ctx, ctypes.c_size_t, 0, self.backend) 

2038 self.explorer_count_lock = ctx.Lock() 

2039  

2040 cdef void _lock_explorer(self) except *: 

2041 self.explorer_ready.wait() # Wait permission 

2042 self.learner_ready.clear() # Block learner 

2043 with self.explorer_count_lock: 

2044 self.explorer_count.value += 1 

2045  

2046 cdef void _unlock_explorer(self) except *: 

2047 with self.explorer_count_lock: 

2048 self.explorer_count.value -= 1 

2049 if self.explorer_count.value == 0: 

2050 self.learner_ready.set() 

2051  

2052 cdef void _lock_learner(self) except *: 

2053 self.explorer_ready.clear() # New explorer cannot enter into critical section 

2054 self.learner_ready.wait() # Wait until all explorer exit from critical section 

2055  

2056 cdef void _unlock_learner(self) except *: 

2057 self.explorer_ready.set() # Allow workers to enter into critical section 

2058  

2059 def add(self,*,**kwargs): 

2060 r"""Add transition(s) into replay buffer. 

2061  

2062 Multple sets of transitions can be added simultaneously. This method 

2063 can be called from multiple explorer processes without manual lock. 

2064  

2065 Parameters 

2066 ---------- 

2067 **kwargs : array like or float or int 

2068 Transitions to be stored. 

2069  

2070 Returns 

2071 ------- 

2072 int 

2073 The first index of stored position. 

2074  

2075 Raises 

2076 ------ 

2077 KeyError 

2078 If any values defined at constructor are missing. 

2079  

2080 Warnings 

2081 -------- 

2082 All values must be passed by key-value style (keyword arguments). 

2083 It is user responsibility that all the values have the same step-size. 

2084 """ 

2085 cdef size_t N = self.size_check.step_size(kwargs) 

2086  

2087 cdef size_t index = self.index.fetch_add(N) 

2088 cdef size_t end = index + N 

2089 cdef add_idx = np.arange(index,end) 

2090  

2091 if end > self.buffer_size: 

2092 add_idx[add_idx >= self.buffer_size] -= self.buffer_size 

2093  

2094  

2095 self._lock_explorer() 

2096  

2097 for name, b in self.buffer.items(): 

2098 b[add_idx] = np.reshape(np.array(kwargs[name],copy=False,ndmin=2), 

2099 self.env_dict[name]["add_shape"]) 

2100  

2101 self._unlock_explorer() 

2102 return index 

2103  

2104 def get_all_transitions(self,shuffle: bool=False): 

2105 r""" 

2106 Get all transitions stored in replay buffer. 

2107  

2108 Parameters 

2109 ---------- 

2110 shuffle : bool, optional 

2111 When ``True``, transitions are shuffled. The default value is ``False``. 

2112  

2113 Returns 

2114 ------- 

2115 transitions : dict of numpy.ndarray 

2116 All transitions stored in this replay buffer. 

2117 """ 

2118 idx = np.arange(self.get_stored_size()) 

2119  

2120 if shuffle: 

2121 np.random.shuffle(idx) 

2122  

2123 self._lock_learner() 

2124 ret = self._encode_sample(idx) 

2125 self._unlock_learner() 

2126  

2127 return ret 

2128  

2129 def _encode_sample(self,idx): 

2130 cdef sample = {} 

2131  

2132 idx = np.array(idx,copy=False,ndmin=1) 

2133  

2134 for name, b in self.buffer.items(): 

2135 sample[name] = b[idx] 

2136  

2137 return sample 

2138  

2139 def sample(self,batch_size): 

2140 r"""Sample the stored transitions randomly with specified size 

2141  

2142 This method can be called from a single learner process. 

2143  

2144 Parameters 

2145 ---------- 

2146 batch_size : int 

2147 sampled batch size 

2148  

2149 Returns 

2150 ------- 

2151 dict of ndarray 

2152 Sampled batch transitions, which might contains 

2153 the same transition multiple times. 

2154 """ 

2155 cdef idx = np.random.randint(0,self.get_stored_size(),batch_size) 

2156  

2157 self._lock_learner() 

2158 ret = self._encode_sample(idx) 

2159 self._unlock_learner() 

2160  

2161 return ret 

2162  

2163 cpdef void clear(self) except *: 

2164 r"""Clear replay buffer. 

2165  

2166 Set ``index`` and ``stored_size`` to ``0``. 

2167  

2168 Example 

2169 ------- 

2170 >>> rb = ReplayBuffer(5,{"done",{}}) 

2171 >>> rb.add(1) 

2172 >>> rb.get_stored_size() 

2173 1 

2174 >>> rb.get_next_index() 

2175 1 

2176 >>> rb.clear() 

2177 >>> rb.get_stored_size() 

2178 0 

2179 >>> rb.get_next_index() 

2180 0 

2181 """ 

2182 self.index.clear() 

2183  

2184 cpdef size_t get_stored_size(self): 

2185 r"""Get stored size 

2186  

2187 Returns 

2188 ------- 

2189 size_t 

2190 Stored size 

2191 """ 

2192 return self.index.get_stored_size() 

2193  

2194 cpdef size_t get_buffer_size(self): 

2195 r"""Get buffer size 

2196  

2197 Returns 

2198 ------- 

2199 size_t 

2200 Buffer size 

2201 """ 

2202 return self.buffer_size 

2203  

2204 cpdef size_t get_next_index(self): 

2205 r"""Get the next index to store 

2206  

2207 Returns 

2208 ------- 

2209 size_t 

2210 Next index to store 

2211 """ 

2212 return self.index.get_next_index() 

2213  

2214 cpdef void on_episode_end(self) except *: 

2215 r"""Call on episode end 

2216  

2217 Notes 

2218 ----- 

2219 Calling this function at episode end is the user responsibility, 

2220 since episode exploration can be terminated at certain length 

2221 even though any ``done`` flags from environment is not set. 

2222  

2223 Warnings 

2224 -------- 

2225 Although nothing happens for ``MPReplayBuffer``, it is better to call this 

2226 because some functionalities might be added in the future version. 

2227 """ 

2228 pass 

2229  

2230 cpdef bool is_Nstep(self): 

2231 r"""Get whether use Nstep or not 

2232  

2233 Since ``MPReplayBuffer`` doesn't supports Nstep feature, 

2234 return value is always ``False``. 

2235  

2236 Returns 

2237 ------- 

2238 bool 

2239 ``False`` 

2240 """ 

2241 return False 

2242  

2243  

2244cdef class ThreadSafePrioritizedSampler: 

2245 cdef size_t size 

2246 cdef float alpha 

2247 cdef float eps 

2248 cdef backend 

2249 cdef max_p 

2250 cdef sum 

2251 cdef sum_a#nychanged 

2252 cdef min 

2253 cdef min_a#nychanged 

2254 cdef CppThreadSafePrioritizedSampler[float]* per 

2255  

2256 def __init__(self,size,alpha,eps,max_p=None, 

2257 sum=None,sum_a=None, 

2258 min=None,min_a=None, 

2259 ctx = None, 

2260 backend = "sharedctypes"): 

2261 ctx = ctx or mp.get_context() 

2262 self.size = size 

2263 self.alpha = alpha 

2264 self.eps = eps 

2265 self.backend = backend 

2266  

2267 self.max_p = max_p or RawArray(ctx, ctypes.c_float,1,self.backend) 

2268 cdef float [:] view_max_p = self.max_p.ndarray 

2269  

2270 cdef size_t pow2size = 1 

2271 while pow2size < size: 

2272 pow2size *= 2 

2273  

2274 self.sum = sum or RawArray(ctx, ctypes.c_float,2*pow2size-1, self.backend) 

2275 self.sum_a = sum_a or RawArray(ctx, ctypes.c_bool ,1 , self.backend) 

2276 self.min = min or RawArray(ctx, ctypes.c_float,2*pow2size-1, self.backend) 

2277 self.min_a = min_a or RawArray(ctx, ctypes.c_bool ,1 , self.backend) 

2278  

2279 cdef float [:] view_sum = self.sum.ndarray 

2280 cdef bool [:] view_sum_a = self.sum_a.ndarray 

2281 cdef float [:] view_min = self.min.ndarray 

2282 cdef bool [:] view_min_a = self.min_a.ndarray 

2283  

2284 cdef bool init = ((max_p is None) and 

2285 (sum is None) and 

2286 (sum_a is None) and 

2287 (min is None) and 

2288 (min_a is None)) 

2289  

2290 self.per = new CppThreadSafePrioritizedSampler[float](size,alpha, 

2291 &view_max_p[0], 

2292 &view_sum[0], 

2293 &view_sum_a[0], 

2294 &view_min[0], 

2295 &view_min_a[0], 

2296 init, 

2297 eps) 

2298  

2299 cdef CppThreadSafePrioritizedSampler[float]* ptr(self): 

2300 return self.per 

2301  

2302 def __reduce__(self): 

2303 return (ThreadSafePrioritizedSampler, 

2304 (self.size, self.alpha, self.eps, self.max_p, 

2305 self.sum, self.sum_a, self.min, self.min_a, 

2306 None, self.backend)) 

2307  

2308  

2309@cython.embedsignature(True) 

2310cdef class MPPrioritizedReplayBuffer(MPReplayBuffer): 

2311 r"""Multi-process support Prioritized Replay Buffer class to store transitions with priorities. 

2312  

2313 This class can work on multi-process without manual lock. 

2314  

2315 In this class, these transitions are sampled with corresponding to priorities. 

2316  

2317 Notes 

2318 ----- 

2319 This class assumes single learner (``sample``, ``update_priorities``) and 

2320 multiple explorers (``add``) like Ape-X [1]_. 

2321  

2322 References 

2323 ---------- 

2324 .. [1] D. Horgan et al., "Distributed Prioritized Experience Replay", ICLR (2018), 

2325 https://openreview.net/forum?id=H1Dy---0Z 

2326 https://arxiv.org/abs/1803.00933 

2327 """ 

2328 cdef VectorFloat weights 

2329 cdef VectorSize_t indexes 

2330 cdef ThreadSafePrioritizedSampler per 

2331 cdef unchange_since_sample 

2332 cdef terminate 

2333 cdef explorer_per_count 

2334 cdef explorer_per_count_lock 

2335 cdef learner_per_ready 

2336 cdef explorer_per_ready 

2337 cdef vector[size_t] idx_vec 

2338 cdef vector[float] ps_vec 

2339  

2340 def __init__(self,size,env_dict=None,*,alpha=0.6,eps=1e-4,ctx=None,**kwargs): 

2341 r"""Initialize ``MPPrioritizedReplayBuffer`` 

2342  

2343 Parameters 

2344 ---------- 

2345 size : int 

2346 Buffer size 

2347 env_dict : dict of dict, optional 

2348 Dictionary specifying environments. The keys of ``env_dict`` become 

2349 environment names. The values of ``env_dict``, which are also ``dict``, 

2350 defines ``"shape"`` (default ``1``) and ``"dtypes"`` (fallback to 

2351 ``default_dtype``) 

2352 alpha : float, optional 

2353 :math:`\alpha` the exponent of the priorities in stored whose 

2354 default value is ``0.6`` 

2355 eps : float, optional 

2356 :math:`\epsilon` small positive constant to ensure error-less state 

2357 will be sampled, whose default value is ``1e-4``. 

2358  

2359 See Also 

2360 -------- 

2361 MPReplayBuffer : Any optional parameters at ``MPReplayBuffer`` are valid, too. 

2362 PrioritizedReplayBuffer : Single process version Prioritized Experience Replay 

2363  

2364  

2365 Notes 

2366 ----- 

2367 The minimum and summation over certain ranges of pre-calculated priorities 

2368 :math:`(p_{i} + \epsilon )^{ \alpha }` are stored with segment tree, which 

2369 enable fast sampling. 

2370 """ 

2371 ctx = ctx or mp.get_context() 

2372 super().__init__(size,env_dict,ctx=ctx,**kwargs) 

2373  

2374 self.per = ThreadSafePrioritizedSampler(size,alpha,eps, 

2375 ctx=ctx, backend=self.backend) 

2376  

2377 self.weights = VectorFloat() 

2378 self.indexes = VectorSize_t() 

2379  

2380 self.unchange_since_sample = RawArray(ctx, ctypes.c_bool, size, self.backend) 

2381 self.unchange_since_sample[:] = True 

2382  

2383 self.terminate = RawValue(ctx, ctypes.c_bool,0, self.backend) 

2384 self.terminate.value = False 

2385  

2386 self.learner_per_ready = ctx.Event() 

2387 self.learner_per_ready.clear() 

2388 self.explorer_per_ready = ctx.Event() 

2389 self.explorer_per_ready.set() 

2390 self.explorer_per_count = RawValue(ctx, ctypes.c_size_t, 0, self.backend) 

2391 self.explorer_per_count_lock = ctx.Lock() 

2392  

2393 self.idx_vec = vector[size_t]() 

2394 self.ps_vec = vector[float]() 

2395  

2396  

2397 cdef void _lock_explorer_per(self) except *: 

2398 self.explorer_per_ready.wait() # Wait permission 

2399 self.learner_per_ready.clear() # Block learner 

2400 with self.explorer_per_count_lock: 

2401 self.explorer_per_count.value += 1 

2402  

2403 cdef void _unlock_explorer_per(self) except *: 

2404 with self.explorer_per_count_lock: 

2405 self.explorer_per_count.value -= 1 

2406 if self.explorer_per_count.value == 0: 

2407 self.learner_per_ready.set() 

2408  

2409 cdef void _lock_learner_per(self) except *: 

2410 self.explorer_per_ready.clear() 

2411 self.learner_per_ready.wait() 

2412  

2413 cdef void _unlock_learner_per(self) except *: 

2414 self.explorer_per_ready.set() 

2415  

2416 cdef void _lock_learner_unlock_learner_per(self) except *: 

2417 self.explorer_ready.clear() 

2418 self.explorer_per_ready.set() 

2419 self.learner_ready.wait() 

2420  

2421 def add(self,*,priorities = None,**kwargs): 

2422 r"""Add transition(s) into replay buffer. 

2423  

2424 Multple sets of transitions can be added simultaneously. This method can be 

2425 called from multiple explorer processes without manual lock. 

2426  

2427 Parameters 

2428 ---------- 

2429 priorities : array like or float, optional 

2430 Priorities of each environment. When no priorities are passed, 

2431 the maximum priorities until then are used. 

2432 **kwargs : array like or float or int 

2433 Transitions to be stored. 

2434  

2435 Returns 

2436 ------- 

2437 int 

2438 The first index of stored position. 

2439  

2440 Raises 

2441 ------ 

2442 KeyError 

2443 If any values defined at constructor are missing. 

2444  

2445 Warnings 

2446 -------- 

2447 All values must be passed by key-value style (keyword arguments). 

2448 It is user responsibility that all the values have the same step-size. 

2449 """ 

2450 cdef size_t N = self.size_check.step_size(kwargs) 

2451 cdef const float [:] ps 

2452  

2453 if priorities is not None: 

2454 priorities = np.ravel(np.array(priorities,copy=False, 

2455 ndmin=1,dtype=np.single)) 

2456 if N != priorities.shape[0]: 

2457 raise ValueError("`priorities` shape is incompatible") 

2458  

2459 cdef size_t index = self.index.fetch_add(N) 

2460 cdef size_t end = index + N 

2461 cdef add_idx = np.arange(index,end) 

2462  

2463 if end > self.buffer_size: 

2464 add_idx[add_idx >= self.buffer_size] -= self.buffer_size 

2465  

2466  

2467 self._lock_explorer_per() 

2468  

2469 if priorities is not None: 

2470 ps = np.ravel(np.array(priorities,copy=False,ndmin=1,dtype=np.single)) 

2471 self.per.ptr().set_priorities(index,&ps[0],N,self.get_buffer_size()) 

2472 else: 

2473 self.per.ptr().set_priorities(index,N,self.get_buffer_size()) 

2474  

2475 if index+N <= self.buffer_size: 

2476 self.unchange_since_sample[index:index+N] = False 

2477 else: 

2478 self.unchange_since_sample[index:] = False 

2479 self.unchange_since_sample[:index+N-self.buffer_size] = False 

2480  

2481 self._lock_explorer() 

2482 self._unlock_explorer_per() 

2483  

2484 for name, b in self.buffer.items(): 

2485 b[add_idx] = np.reshape(np.array(kwargs[name],copy=False,ndmin=2), 

2486 self.env_dict[name]["add_shape"]) 

2487  

2488 self._unlock_explorer() 

2489 return index 

2490  

2491 def sample(self,batch_size,beta = 0.4): 

2492 r"""Sample the stored transitions. 

2493  

2494 Transitions are sampled depending on correspoinding priorities 

2495 with specified size. This method can be called from single learner process. 

2496  

2497 Parameters 

2498 ---------- 

2499 batch_size : int 

2500 Sampled batch size 

2501 beta : float, optional 

2502 The exponent of weight for relaxation of importance 

2503 sampling effect, whose default value is ``0.4`` 

2504  

2505 Returns 

2506 ------- 

2507 dict of ndarray 

2508 Sampled batch transitions which also includes 

2509 ``"weights"`` and ``"indexes"`` 

2510  

2511 Notes 

2512 ----- 

2513 When ``beta`` is ``0``, weights become uniform. When ``beta`` is ``1``, 

2514 weight becomes usual importance sampling. 

2515 The ``weights`` are also normalized by the weight for minimum priority 

2516 (:math:`= w_{i}/\max_{j}(w_{j})`), which ensure the weights :math:`\leq` 1. 

2517 """ 

2518 self._lock_learner_per() 

2519 self.per.ptr().sample(batch_size,beta, 

2520 self.weights.vec,self.indexes.vec, 

2521 self.get_stored_size()) 

2522 cdef idx = self.indexes.as_numpy() 

2523  

2524 self._lock_learner_unlock_learner_per() 

2525  

2526 samples = self._encode_sample(idx) 

2527 self.unchange_since_sample[:] = True 

2528 self._unlock_learner() 

2529  

2530 samples['weights'] = self.weights.as_numpy() 

2531 samples['indexes'] = idx 

2532  

2533 return samples 

2534  

2535 def update_priorities(self,indexes,priorities): 

2536 r"""Update priorities 

2537  

2538 Update priorities specified with indicies. Ignores indices 

2539 which updated values after the last calling of ``sample()`` 

2540 method. This method can be called from single learner process. 

2541  

2542 Parameters 

2543 ---------- 

2544 indexes : array_like 

2545 Indexes to update priorities 

2546 priorities : array_like 

2547 Priorities to update 

2548  

2549 Raises 

2550 ------ 

2551 TypeError 

2552 When ``indexes`` or ``priorities`` are ``None`` 

2553 """ 

2554  

2555 if priorities is None: 

2556 raise TypeError("`properties` must not be `None`") 

2557  

2558 cdef const size_t [:] idx = Csize(indexes) 

2559 cdef const float [:] ps = Cfloat(priorities) 

2560  

2561 self.idx_vec.clear() 

2562 self.idx_vec.reserve(idx.shape[0]) 

2563  

2564 self.ps_vec.clear() 

2565 self.ps_vec.reserve(ps.shape[0]) 

2566  

2567 self._lock_learner_per() 

2568 cdef size_t stored_size = self.get_stored_size() 

2569 for _i in range(idx.shape[0]): 

2570 if idx[_i] < stored_size and self.unchange_since_sample[idx[_i]]: 

2571 self.idx_vec.push_back(idx[_i]) 

2572 self.ps_vec.push_back(ps[_i]) 

2573  

2574 cdef N = self.idx_vec.size() 

2575 if N > 0: 

2576 self.per.ptr().update_priorities(self.idx_vec.data(),self.ps_vec.data(),N) 

2577 self._unlock_learner_per() 

2578  

2579 cpdef void clear(self) except *: 

2580 r"""Clear replay buffer 

2581 """ 

2582 super(MPPrioritizedReplayBuffer,self).clear() 

2583 clear(self.per.ptr()) 

2584  

2585 cpdef float get_max_priority(self): 

2586 r"""Get the max priority of stored priorities 

2587  

2588 Returns 

2589 ------- 

2590 max_priority : float 

2591 The max priority of stored priorities 

2592 """ 

2593 return self.per.ptr().get_max_priority() 

2594  

2595 cpdef void on_episode_end(self) except *: 

2596 r"""Call on episode end 

2597  

2598 Notes 

2599 ----- 

2600 Calling this function at episode end is the user responsibility, 

2601 since episode exploration can be terminated at certain length 

2602 even though any ``done`` flags from environment is not set. 

2603 """ 

2604 pass 

2605  

2606  

2607@cython.embedsignature(True) 

2608def create_buffer(size,env_dict=None,*,prioritized = False,**kwargs): 

2609 r"""Create specified version of replay buffer 

2610  

2611 Parameters 

2612 ---------- 

2613 size : int 

2614 Buffer size 

2615 env_dict : dict of dict, optional 

2616 Dictionary specifying environments. The keys of ``env_dict`` become 

2617 environment names. The values of ``env_dict``, which are also ``dict``, 

2618 defines ``"shape"`` (default ``1``) and ``"dtypes"`` (fallback to 

2619 ``default_dtype``) 

2620 prioritized : bool, optional 

2621 Whether create prioritized version replay buffer. The default is ``False``. 

2622  

2623 Returns 

2624 ------- 

2625 ReplayBuffer or PrioritizedReplayBuffer 

2626 Replay Buffer 

2627  

2628 Raises 

2629 ------ 

2630 NotImplementedError 

2631 If you specified not implemented version replay buffer 

2632  

2633 Notes 

2634 ----- 

2635 Any other keyword arguments are passed to replay buffer constructor. 

2636  

2637 See Also 

2638 -------- 

2639 ReplayBuffer, PrioritizedReplayBuffer 

2640 """ 

2641 per = "Prioritized" if prioritized else "" 

2642  

2643 buffer_name = f"{per}ReplayBuffer" 

2644  

2645 cls={"ReplayBuffer": ReplayBuffer, 

2646 "PrioritizedReplayBuffer": PrioritizedReplayBuffer} 

2647  

2648 buffer = cls.get(f"{buffer_name}",None) 

2649  

2650 if buffer: 

2651 return buffer(size,env_dict,**kwargs) 

2652  

2653 raise NotImplementedError(f"{buffer_name} is not Implemented") 

2654  

2655  

2656@cython.embedsignature(True) 

2657def train(buffer: ReplayBuffer, 

2658 env, 

2659 get_action: Callable, 

2660 update_policy: Callable,*, 

2661 max_steps: int=int(1e6), 

2662 max_episodes: Optional[int] = None, 

2663 batch_size: int = 64, 

2664 n_warmups: int = 0, 

2665 after_step: Optional[Callable] = None, 

2666 done_check: Optional[Callable] = None, 

2667 obs_update: Optional[Callable] = None, 

2668 rew_sum: Optional[Callable[[float, Any], float]] = None, 

2669 episode_callback: Optional[Callable[[int,int,float],Any]] = None, 

2670 logger = None): 

2671 r""" 

2672 Train RL policy (model) 

2673  

2674 Parameters 

2675 ---------- 

2676 buffer: ReplayBuffer 

2677 Buffer to be used for training 

2678 env: gym.Enviroment compatible 

2679 Environment to learn 

2680 get_action: Callable 

2681 Callable taking ``obs`` and returning ``action`` 

2682 update_policy: Callable 

2683 Callable taking ``sample``, ``step``, and ``episode``, updating policy, 

2684 and returning :math:`|TD|`. 

2685 max_steps: int, optional 

2686 Maximum steps to learn. The default value is ``1000000`` 

2687 max_episodes: int, optional 

2688 Maximum episodes to learn. The defaul value is ``None`` 

2689 n_warmups: int, optional 

2690 Warmup steps before sampling. The default value is ``0`` (No warmup) 

2691 after_step: Callable, optional 

2692 Callable converting from ``obs``, returns of ``env.step(action)``, 

2693 ``step``, and ``episode`` to ``dict`` of a transition for 

2694 ``ReplayBuffer.add``. 

2695 This function can also be used for step summary callback. 

2696 done_check: Callable, optional 

2697 Callable checking done 

2698 obs_update: Callable, optional 

2699 Callable updating obs 

2700 rew_sum: Callable[[float, Dict], float], optional 

2701 Callable summarizing episode reward 

2702 episode_callback: Callable[[int, int, float], Any], optional 

2703 Callable for episode summarization 

2704 logger: logging.Logger, optional 

2705 Custom Logger 

2706  

2707 Raises 

2708 ------ 

2709 ValueError: 

2710 When ``max_step`` is larger than ``size_t`` limit 

2711  

2712 Warnings 

2713 -------- 

2714 ``cpprb.train`` is still beta release. API can be changed. 

2715 """ 

2716 warnings.warn("`cpprb.train` is still beta release. API can be changed.") 

2717  

2718 logger = logger or default_logger() 

2719  

2720 cdef size_t size_t_limit = -1 

2721 if max_steps >= int(size_t_limit): 

2722 raise ValueError(f"max_steps ({max_steps}) is too big. " + 

2723 f"max_steps < {size_t_limit}") 

2724  

2725 cdef bool use_per = isinstance(buffer,PrioritizedReplayBuffer) 

2726 cdef bool has_after_step = after_step 

2727 cdef bool has_check = done_check 

2728 cdef bool has_obs_update = obs_update 

2729 cdef bool has_rew_sum = rew_sum 

2730 cdef bool has_episode_callback = episode_callback 

2731  

2732 cdef size_t _max_steps = max(max_steps,0) 

2733 cdef size_t _max_episodes = min(max(max_episodes or size_t_limit, 0),size_t_limit) 

2734 cdef size_t _n_warmup = min(max(0,n_warmups),size_t_limit) 

2735  

2736 cdef size_t step = 0 

2737 cdef size_t episode = 0 

2738 cdef size_t episode_step = 0 

2739 cdef float episode_reward = 0.0 

2740 cdef bool is_warmup = True 

2741  

2742 obs = env.reset() 

2743 cdef double episode_start_time = time.perf_counter() 

2744 cdef double episode_end_time = 0.0 

2745 for step in range(_max_steps): 

2746 is_warmup = (step < _n_warmup) 

2747  

2748 # Get action 

2749 action = get_action(obs,step,episode,is_warmup) 

2750  

2751 # Step environment 

2752 if has_after_step: 

2753 transition = after_step(obs,action,env.step(action),step,episode) 

2754 else: 

2755 next_obs, reward, done, _ = env.step(action) 

2756 transition = {"obs": obs, 

2757 "act": action, 

2758 "rew": reward, 

2759 "next_obs": next_obs, 

2760 "done": done} 

2761  

2762 # Add to buffer 

2763 buffer.add(**transition) 

2764  

2765 # For Nstep, ReplayBuffer can be empty after `add(**transition)` method 

2766 if (buffer.get_stored_size() > 0) and (not is_warmup): 

2767 # Sample 

2768 sample = buffer.sample(batch_size) 

2769 absTD = update_policy(sample,step,episode) 

2770  

2771 if use_per: 

2772 buffer.update_priorities(sample["indexes"],absTD) 

2773  

2774 # Summarize reward 

2775 episode_reward = (rew_sum(episode_reward,transition) if has_rew_sum 

2776 else transition["rew"]) 

2777  

2778 # Prepare the next step 

2779 if done_check(transition) if has_check else transition["done"]: 

2780 episode_end_time = time.perf_counter() 

2781  

2782 # step/episode_step are index. 

2783 # Total Steps/Episode Steps are counts. 

2784 SPS = (episode_step+1) / max(episode_end_time-episode_start_time,1e-9) 

2785 logger.info(f"{episode: 6}th Episode: " + 

2786 f"{episode_step+1: 5} Steps " + 

2787 f"({step+1: 7} Total Steps), " + 

2788 f"{episode_reward: =+7.2f} Reward, " + 

2789 f"{SPS: =+5.2f} Steps/s") 

2790  

2791 # Summary 

2792 if has_episode_callback: 

2793 episode_callback(episode,episode_step,episode_reward) 

2794  

2795 # Reset 

2796 obs = env.reset() 

2797 buffer.on_episode_end() 

2798 episode_reward = 0.0 

2799 episode_step = 0 

2800  

2801 # Update episode count 

2802 episode += 1 

2803 if episode >= _max_episodes: 

2804 break 

2805  

2806 episode_start_time = time.perf_counter() 

2807 else: 

2808 obs = obs_update(transition) if has_obs_update else transition["next_obs"] 

2809 episode_step += 1